package com.ks.demo.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author：kangsen
 * @Date：2022/9/27/0027 23:21
 * @Versiion：1.0
 * @Desc:  生产者的消息确认机制
 */
@Configuration
@Slf4j
public class RabbitMqConfig {
    /**
     * 消费者数量 默认10
     */
    public static final int DEFAULT_CONCURRENT = 10;
    /**
     * 每个消费者获得最大投递数量 默认50
     */
    public static final int DEFAULT_PREFETCH_COUNT = 50;

    @Bean("pointTaskContainerFactory")
    public SimpleRabbitListenerContainerFactory pintTaskContainerFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setPrefetchCount(DEFAULT_PREFETCH_COUNT);
        factory.setConcurrentConsumers(DEFAULT_CONCURRENT);
        configurer.configure(factory,connectionFactory);
        return factory;
    }

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //设置开启Mandatory 才能触发回调函数 无论消息推送结果怎样都强制调用回掉函数
        rabbitTemplate.setMandatory(true);
        /**
         *消息推送确认回掉
         *1、消息推送到server 没找到交换机
         *2、消息推送都server 找到了交换机 没找到 队列
         *3、消息推送到server 啥都没有找到
         *4、消息推送成功
         */
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
            /**
             * Confirmation callback.
             * @param correlationData correlation data for the callback.
             * @param ack             true for ack, false for nack
             * @param cause           An optional cause, for nack, when available, otherwise null.
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("ConfirmCallback correlationData:{} ", correlationData);
                log.info("ConfirmCallback ack:{} ", ack);//ack 为true 说明消息到达了服务端
                log.info("ConfirmCallback cause:{} ", cause);
            }
        });

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback(){
            /**
             * Returned message callback.
             *
             * @param message    the returned message.
             * @param replyCode  the reply code.
             * @param replyText  the reply text.
             * @param exchange   the exchange.
             * @param routingKey the routing key.
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("returnedMessage message : {}" , message);
                log.info("returnedMessage replyCode : {}" , replyCode);
                log.info("returnedMessage replyText : {}" , replyText);
                log.info("returnedMessage exchange : {}" , exchange);
                log.info("returnedMessage routingKey : {}" , routingKey);
            }
        });
        return rabbitTemplate;
    }
}
